---
title: Pipelines | Dagster
description: A pipeline is a set of solids with explicit data dependencies on each other, creating a directed acyclic graph, or DAG.
---

# Pipeline

A pipeline is a set of solids with explicit data dependencies on each other, creating a directed acyclic graph, or DAG.

![Pipeline Diagram](/images/pipelines.png)

## Relevant APIs

| Name                                                                    | Description                                                                                                                                                                                                        |
| ----------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| <PyObject module="dagster" object="pipeline" displayText="@pipeline" /> | The decorator used to define a pipeline.                                                                                                                                                                           |
| <PyObject module="dagster" object="PipelineDefinition" />               | Base class for solids. You almost never want to use initialize this class directly. Instead, you should use the <PyObject object="pipeline" decorator /> which returns a <PyObject object="PipelineDefinition"  /> |
| <PyObject module="dagster" object="ModeDefinition" />                   | Modes allow you to vary pipeline behavior between different deployment environments. For more info, see the [Modes](...) section                                                                                   |
| <PyObject module="dagster" object="PresetDefinition" />                 | Presets allow you to predefine pipeline configurations.                                                                                                                                                            |

## Overview

Solids are linked together into pipelines by defining the dependencies between their inputs and outputs. An important difference between Dagster and other workflow systems is that in Dagster, solids dependencies are expressed as data dependencies instead of just the order solids should execute in.

This difference enables Dagster to support richer modeling of dependencies. Instead of merely ensuring that the order of execution is correct, dependencies in Dagster provide a variety of compile and run-time checks.

Dependencies are expressed in Pipelines using Dagster's function invocation DSL.

---

## Defining a pipeline

To define a pipeline, use the <PyObject object="pipeline" decorator /> decorator.

Within the decorated function body, we use function calls to indicate the dependency structure between the solids making up the pipeline.

In this example, the `add_one` solid depends on the `return_one` solid's output. Because this data dependency exists, the `return_one` solid executes after `add_one` runs successfully and emits the required output.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_pipeline_example_marker endbefore=end_pipeline_example_marker
@solid
def return_one(context):
    return 1


@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
    return number + 1


@pipeline
def one_plus_one_pipeline():
    add_one(return_one())
```

### Aliases and Tags

### Solid aliases

You can use the same solid definition multiple times in the same pipeline.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_multiple_usage_pipeline endbefore=end_multiple_usage_pipeline
@pipeline
def multiple_usage_pipeline():
    add_one(add_one(return_one()))
```

To differentiate between the two invocations of `add_one` Dagster automatically aliases the solid names to be `add_one` and `add_one_2`.

You can also manually define the alias by using the `.alias` method on the solid invocation.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_alias_pipeline endbefore=end_alias_pipeline
@pipeline
def alias_pipeline():
    add_one.alias("second_addition")(add_one(return_one()))
```

### Solid Tags

Similar to aliases, you can also define solid tags on a solid invocation.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_tag_pipeline endbefore=end_tag_pipeline
@pipeline
def tag_pipeline():
    add_one.tag({"my_tag": "my_value"})(add_one(return_one()))
```

## Pipeline Configuration

### Pipeline Modes

Pipeline definitions do not expose a configuration schema. Instead, they specify a set of <PyObject object="ModeDefinition" pluralize/> that can be used with the pipeline. For more information on Modes, see the [Modes](/concepts/modes-resources) section.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_modes_pipeline endbefore=end_modes_pipeline
dev_mode = ModeDefinition("dev")
staging_mode = ModeDefinition("staging")
prod_mode = ModeDefinition("prod")


@pipeline(mode_defs=[dev_mode, staging_mode, prod_mode])
def my_modes_pipeline():
    my_solid()
```

### Pipeline Presets

You can predefine configurations in which a pipeline can execute. <PyObject object="PresetDefinition" /> allows you to do so by specifying configuration values at the pipeline definition time.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_preset_pipeline endbefore=end_preset_pipeline
@pipeline(
    preset_defs=[
        PresetDefinition(
            name="one",
            run_config={"solids": {"add_one": {"inputs": {"number": 1}}}},
        ),
        PresetDefinition(
            name="two",
            run_config={"solids": {"add_one": {"inputs": {"number": 2}}}},
        ),
    ]
)
def my_presets_pipeline():
    add_one()
```

In Dagit, You can choose and load a preset in the Playground:

![Pipeline Presets](/images/concepts/pipelines/presets.png)

To run a pipeline in a script or in a test, You can specify a preset name via `preset` argument to the Python API <PyObject object="execute_pipeline" />:

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_run_preset endbefore=end_run_preset
def run_pipeline():
    execute_pipeline(my_presets_pipeline, preset="one")
```

You can also use the Dagster CLI <PyObject module="dagster-pipeline-execute" object="--preset" displayText="dagster pipeline execute --preset" /> to run a pipeline with a preset name:

```bash
dagster pipeline execute my_presets_pipeline --preset one
```

### Pipeline Tags

Pipelines can specify a set of tags that are also automatically set on the resulting pipeline runs.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_tags_pipeline endbefore=end_tags_pipeline
@pipeline(tags={"my_tag": "my_value"})
def my_tags_pipeline():
    my_solid()
```

This pipeline defines a `my_tag` tag. Any pipeline runs created using this pipeline will also have the same tag.

## Examples

There are many DAG structures can represent using pipelines. This section covers a few basic patterns you can use to build more complex pipelines.

### Linear Dependencies

The simplest pipeline structure is the linear pipeline. We return one output from the root solid, and pass along data through single inputs and outputs.

![Linear Pipeline](/images/concepts/pipelines/linear.png)

```python file=/concepts/solids_pipelines/linear_pipeline.py startafter=start_marker endbefore=end_marker
from dagster import InputDefinition, pipeline, solid


@solid
def return_one(context):
    return 1


@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
    return number + 1


@pipeline
def linear_pipeline():
    add_one(add_one(add_one(return_one())))
```

### Multiple Inputs and Outputs

![Branching Pipeline](/images/concepts/pipelines/branch.png)

A single output can be passed to multiple inputs on downstream solids. In this example, the output from the first solid is passed to two different solids. The outputs of those solids are combined and passed to the final solid.

```python file=/concepts/solids_pipelines/multiple_io_pipeline.py startafter=start_marker endbefore=end_marker
from dagster import InputDefinition, Output, OutputDefinition, pipeline, solid


@solid
def return_one(context):
    return 1


@solid(input_defs=[InputDefinition("number", int)])
def add_one(context, number):
    return number + 1


@solid(
    input_defs=[
        InputDefinition(name="a", dagster_type=int),
        InputDefinition(name="b", dagster_type=int),
    ],
    output_defs=[
        OutputDefinition(name="sum", dagster_type=int),
    ],
)
def adder(context, a, b):
    yield Output(a + b, output_name="sum")


@pipeline
def inputs_and_outputs_pipeline():
    value = return_one()
    a = add_one(value)
    b = add_one(value)
    adder(a, b)
```

### Conditional Branching

<img src="/images/concepts/pipelines/branch.png" />

A solid only starts to execute once all of its inputs have been resolved. We can use this behavior to model conditional execution of solids.

In this example, the `branching_solid` outputs either the `branch_1` result or `branch_2` result. Since solid execution is skipped for solids that have unresolved inputs, only one of the downstream solids will execute.

```python file=/concepts/solids_pipelines/branching_pipeline.py startafter=start_marker endbefore=end_marker
import random

from dagster import InputDefinition, Output, OutputDefinition, pipeline, solid


@solid(
    output_defs=[
        OutputDefinition(int, "branch_1", is_required=False),
        OutputDefinition(int, "branch_2", is_required=False),
    ]
)
def branching_solid(_):
    num = random.randint(0, 1)
    if num == 0:
        yield Output(1, "branch_1")
    else:
        yield Output(2, "branch_2")


@solid(input_defs=[InputDefinition("_input", int)])
def branch_1_solid(_, _input):
    pass


@solid(input_defs=[InputDefinition("_input", int)])
def branch_2_solid(_, _input):
    pass


@pipeline
def branching_pipeline():
    branch_1, branch_2 = branching_solid()
    branch_1_solid(branch_1)
    branch_2_solid(branch_2)
```

### Fixed Fan-in Pipeline

If you have a fixed set of solids that all return the same output type, you can collect all the outputs into a list and pass them into a single downstream solid.

The downstream solid executes only if all of the outputs were successfully created by the upstream solids.

```python file=/concepts/solids_pipelines/fan_in_pipeline.py startafter=start_marker endbefore=end_marker
from dagster import InputDefinition, List, OutputDefinition, pipeline, solid


@solid(output_defs=[OutputDefinition(int)])
def return_one(_):
    return 1


@solid(input_defs=[InputDefinition("nums", List[int])], output_defs=[OutputDefinition(int)])
def sum_fan_in(_, nums):
    return sum(nums)


@pipeline
def fan_in_pipeline():
    fan_outs = []
    for i in range(0, 10):
        fan_outs.append(return_one.alias("return_one_{}".format(i))())
    sum_fan_in(fan_outs)
```

In this example, we have 10 solids that all output the number `1`. The `sum_fan_in` solid takes all of these outputs as a list and sums them.

### Dynamic Mapping & Collect <Experimental/>

In most cases, the structure of a pipeline is pre-determined before execution. Dagster now has experimental support for creating pipelines where the final structure is not determined until run-time. This is useful for pipeline structures where you want to execute a separate instance of a solid for each entry in a certain output.

In this example we have a solid `files_in_directory` that defines a <PyObject object="DynamicOutputDefinition" module="dagster.experimental"/>. We `map` over the dynamic output which will cause the downstream dependencies to be cloned for each <PyObject object="DynamicOutput"  module="dagster.experimental"/> that is yielded. The downstream copies can be identified by the `mapping_key` supplied to <PyObject object="DynamicOutput"  module="dagster.experimental"/>. Once that's all complete, we `collect` the over results of `process_file` and pass that in to `summarize_directory`.

```python file=/concepts/solids_pipelines/dynamic_pipeline/dynamic_pipeline.py startafter=start_marker endbefore=end_marker
import os
from typing import List

from dagster import Field, pipeline, solid
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from dagster.utils import file_relative_path


@solid(
    config_schema={"path": Field(str, default_value=file_relative_path(__file__, "sample"))},
    output_defs=[DynamicOutputDefinition(str)],
)
def files_in_directory(context):
    path = context.solid_config["path"]
    dirname, _, filenames = next(os.walk(path))
    for file in filenames:
        yield DynamicOutput(
            value=os.path.join(dirname, file),
            # create a mapping key from the file name
            mapping_key=file.replace(".", "_").replace("-", "_"),
        )


@solid
def process_file(_, path: str) -> int:
    # simple example of calculating size
    return os.path.getsize(path)


@solid
def summarize_directory(_, sizes: List[int]) -> int:
    # simple example of totalling sizes
    return sum(sizes)


@pipeline
def process_directory():
    file_results = files_in_directory().map(process_file)
    summarize_directory(file_results.collect())
```

### Order-based Dependencies (Nothing dependencies)

<img src="/images/concepts/pipelines/linear.png" />

Dependencies in Dagster are primarily _data dependencies_. Using data dependencies means each input of a solid depends on the output of an upstream solid.

If you have a solid, say `Solid A`, that does not depend on any outputs of another solid, say `Solid B`, there theoretically shouldn't be a reason for `Solid A` to run after `Solid B`. In most cases, these two solids should be parallelizable. However, there are some cases where an explicit ordering is required, but it doesn't make sense to pass data through inputs and outputs to model the dependency.

If you need to model an explicit ordering dependency, you can use the <PyObject object="Nothing"/> Dagster type on the input definition of the downstream solid. This type specifies that you are passing "nothing" via Dagster between the solids, while still uses inputs and outputs to model the dependency between the two solids.

```python file=/concepts/solids_pipelines/order_based_dependency_pipeline.py startafter=start_marker endbefore=end_marker
from dagster import InputDefinition, Nothing, pipeline, solid


@solid
def create_table_1(_context) -> Nothing:
    get_database_connection().execute("create table_1 as select * from some_source_table")


@solid(input_defs=[InputDefinition("start", Nothing)])
def create_table_2(_context):
    get_database_connection().execute("create table_2 as select * from table_1")


@pipeline
def nothing_dependency_pipeline():
    create_table_2(create_table_1())
```

In this example, `create_table_1` returns an output of type <PyObject object="Nothing"/>, and `create_table_2` has an input of type `Nothing`. This lets us connect them in the pipeline definition so that `create_table_2` executes only after `create_table_1` successfully executes.

Note that in most cases, it is usually possible to pass some data dependency. In the example above, even though we probably don't want to pass the table data itself between the solids, we could pass table pointers. For example, `create_table_1` could return a `table_pointer` output of type `str` with a value of `table_1`, and this table name can be used in `create_table_2` to more accurately model the data dependency.

Dagster also provides more advanced abstractions to handle dependencies and IO. If you find that you are finding it difficult to model data dependencies when using external storages, check out [IOManagers](/concepts/io-management/io-managers).

## Patterns

### Constructing PipelineDefinitions

You may run into a situation where you need to programmatically construct the dependency graph for a pipeline. In that case, you can directly define the <PyObject module="dagster" object="PipelineDefinition"/> object.

To construct a PipelineDefinition, you need to pass the constructor a pipeline name, a list of solid definitions, and a dictionary defining the dependency structure. The dependency structure declares the dependencies of each solid’s inputs on the outputs of other solids in the pipeline. The top-level keys of the dependency dictionary are the string names of solids. If you are using solid aliases be sure to use the aliased name. Values of the top-level keys are also dictionary, which maps input names to a <PyObject object="DependencyDefinition"/>.

```python file=/concepts/solids_pipelines/pipelines.py startafter=start_pipeline_definition_marker endbefore=end_pipeline_definition_marker
one_plus_one_pipeline_def = PipelineDefinition(
    name="one_plus_one_pipeline",
    solid_defs=[return_one, add_one],
    dependencies={"add_one": {"number": DependencyDefinition("return_one")}},
)
```

### Pipeline DSL

Sometimes you may want to construct the dependencies of a pipeline definition from a YAML file or similar. This is useful when migrating to Dagster from other workflow systems.

For example, you can have a YAML like this:

```YAML
pipeline:
  name: some_example
  description: blah blah blah
  solids:
    - def: add_one
      alias: A
    - def: add_one
      alias: B
      deps:
        num:
          solid: A
    - def: add_two
      alias: C
      deps:
        num:
          solid: A
    - def: subtract
      deps:
        left:
          solid: B
        right:
          solid: C
```

You can programatically generate a PipelineDefinition from this YAML:

```python
@solid
def add_one(_, num: int) -> int:
    return num + 1


@solid
def add_two(_, num: int) -> int:
    return num + 2


@solid
def subtract(_, left: int, right: int) -> int:
    return left + right


def construct_pipeline_with_yaml(yaml_file, solid_defs):
    yaml_data = load_yaml_from_path(yaml_file)
    solid_def_dict = {s.name: s for s in solid_defs}

    deps = {}

    for solid_yaml_data in yaml_data["pipeline"]["solids"]:
        check.invariant(solid_yaml_data["def"] in solid_def_dict)
        def_name = solid_yaml_data["def"]
        alias = solid_yaml_data.get("alias", def_name)
        solid_deps_entry = {}
        for input_name, input_data in solid_yaml_data.get("deps", {}).items():
            solid_deps_entry[input_name] = DependencyDefinition(
                solid=input_data["solid"], output=input_data.get("output", "result")
            )
        deps[SolidInvocation(name=def_name, alias=alias)] = solid_deps_entry

    return PipelineDefinition(
        name=yaml_data["pipeline"]["name"],
        description=yaml_data["pipeline"].get("description"),
        solid_defs=solid_defs,
        dependencies=deps,
    )


def define_dep_dsl_pipeline():
    return construct_pipeline_with_yaml(
        file_relative_path(__file__, "example.yaml"), [add_one, add_two, subtract]
    )


@repository
def define_repository():
    return {"pipelines": {"some_example": define_dep_dsl_pipeline}}
```
